package cn.zhaopin.starter.mq.common;

import cn.zhaopin.starter.mq.constant.MQConstant;
import org.springframework.lang.Nullable;
import org.springframework.util.CollectionUtils;

import javax.validation.constraints.NotNull;
import java.util.HashMap;
import java.util.Map;

/**
 * Description: 构建 tdmq消息
 *
 * @author: zuomin (myleszelic@outlook.com)
 * @date: 2021/07/22-17:07
 */
public class PulsarMessageBuilder {

    private Object payload;

    private Map<String, Object> headers = new HashMap<>();

    public PulsarMessageBuilder(Object payload) {
        this.payload = payload;
    }

    public static PulsarMessageBuilder withPayload(Object payload) {
        return new PulsarMessageBuilder(payload);
    }

    public PulsarMessageBuilder buildPayload(Object payload) {
        this.payload = payload;
        return this;
    }

    public PulsarMessageBuilder copyHeaders(@Nullable Map<String, Object> headers) {
        this.headers = CollectionUtils.newHashMap(headers.size());
        headers.forEach((key, value) -> {
            if(isReadOnly(key)) {
                this.setHeader(key, value);
            }
        });
        return this;
    }

    protected boolean isReadOnly(String headerName) {
        return MQConstant.Header.ID.equals(headerName) || MQConstant.Header.TIMESTAMP.equals(headerName);
    }

    public PulsarMessageBuilder setHeader(@NotNull String key, @Nullable Object value) {
        this.headers = this.headers == null ? new HashMap<>(1 << 4) : this.headers;
        this.headers.put(key, value);
        return this;
    }

    public PulsarMessage build() {
        return new PulsarMessage(payload, headers);
    }

}
